package com.atguigu.flink.datastreamapi.sink;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

/**
 * Created by Smexy on 2023/11/13
 *
 oldAPI:  流.addSink(SinkFunction x)，例如print
 newAPI:  流.sinkTo(Sink x),例如KafkaSink


 */
public class Demo2_KafkaSinkWithKey
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        KafkaSink<WaterSensor> kafkaSink = KafkaSink
            .<WaterSensor>builder()
            .setBootstrapServers("hadoop102:9092")
            //需要写key，key的作用用于存储元数据，或用于分区。key相同的数据，写到kafka的同一个partition
            .setRecordSerializer(
                new KafkaRecordSerializationSchema<WaterSensor>()
                {
                    @Nullable
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(WaterSensor element, KafkaSinkContext context, Long timestamp) {
                        //id作为key
                        byte[] key = element.getId().getBytes(StandardCharsets.UTF_8);
                        byte[] value = JSON.toJSONString(element).getBytes(StandardCharsets.UTF_8);
                        return new ProducerRecord<>("t3",key,value);
                    }
                }
            )
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "200")
            .setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000")
            .build();

        env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .sinkTo(kafkaSink);


        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
